Kafka Streams自动创建内部topic

运行一段kafka streams程序的时候,报出以下异常

1
2
3
4
5
6
7
8
9
10
11
12
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.128.74.83:9092");
StreamsConfig config = new StreamsConfig(settings);

TopologyBuilder builder = new TopologyBuilder();
builder.addSource("SOURCE", "source_topic").addProcessor("PROCESS1", MyProcessor::new, "SOURCE")
.addStateStore(Stores.create("COUNTS").withStringKeys().withIntegerValues().inMemory().build(),
"PROCESS1")
.addSink("SINK3", "sink_topic", "PROCESS1");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
1
2
3
4
5
6
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: stream-thread [StreamThread-1] Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:410)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Could not find partition info for topic: my-first-streams-application1-COUNTS1-changelog
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:174)
……

从异常信息上看是topic:my-first-streams-application1-COUNTS1-changelog不存在。这个topic是kafka streams的内部topic,用于保存state store的changelog。topic的命名格式是:<application.id>--,这些内部topic应该会在kafka streams应用执行的时候被自动创建,但为什么这里会报topic找不到呢?

联想到用kafka-topics.sh是需要传入zookeeper地址的,因为topic的metadata会保存在zk中,那么kafka streams也应该在配置中指定zk地址才能创建topic,加上以下配置之后程序就能跑起来了

1
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.128.74.83:2181");